// Copyright (c) 2022 by Duguang.IO Inc. All Rights Reserved.
// Author: Ethan Liu
// Date: 2022-05-21 12:47:52

package runtime

import (
	"bytes"
	"context"
	"encoding/json"
	"jianmu-worker-kube/client"
	"jianmu-worker-kube/engine"
	"jianmu-worker-kube/livelog"
	"jianmu-worker-kube/types"
	"time"

	"github.com/hashicorp/go-multierror"

	logger "jianmu-worker-kube/logging"
)

type Worker struct {
	// Client 负责与CI Server交互的客户端
	Client client.Client
	// Engine Kubernetes客户端
	Engine *engine.Kubernetes
	// Worker ID
	ID string
	// Worker Name
	Name string
	// Worker Type
	Type string
	// Default Namespace
	Namespace string
}

// Run 运行Runner
func (w *Worker) Run(ctx context.Context, unit *engine.Unit) error {
	log := logger.FromContext(ctx).
		WithField("unit.name", unit.PodSpec.Name)

	if unit.PodSpec.Namespace == "" {
		unit.PodSpec.Namespace = w.Namespace
	}
	err := w.Client.Accept(ctx, unit.Current)
	if err != nil && err == client.ErrOptimisticLock {
		log.Debug("runner accepted by another worker")
		return nil
	}
	if err != nil {
		log.WithError(err).Error("cannot accept runner")
		return err
	}
	switch unit.Type {
	case "RUN":
		if err := w.run(ctx, unit, unit.Current); err != nil {
			return err
		}
	case "CREATE":
		if err := w.createPod(ctx, unit, unit.Current); err != nil {
			return err
		}
	case "DELETE":
		if err := w.deletePod(ctx, unit, unit.Current); err != nil {
			return err
		}
	default:
		log.Debugln("wrong format unit")
		return nil
	}
	return nil
}

func (w *Worker) Resume(ctx context.Context, unit *engine.Unit) error {
	for _, runner := range unit.Runners {
		if err := w.run(ctx, unit, runner); err != nil {
			return err
		}
	}
	return nil
}

func (w *Worker) createPod(ctx context.Context, unit *engine.Unit, runner *engine.Runner) error {
	log := logger.FromContext(ctx).
		WithField("pod.name", unit.PodSpec.Name)

	task := &engine.Task{
		TaskId: runner.ID,
		Status: engine.Succeed,
	}

	if err := w.Engine.CreatePod(ctx, unit); err != nil {
		task.Status = engine.Failed
		log.Debugln("创建Pod失败")
	}

	if err := w.Client.Update(ctx, task); err != nil {
		log.WithError(err).Error("cannot update task status")
		return err
	}
	s, e := json.Marshal(runner)
	if e != nil {
		log.WithError(e).Errorln("序列化错误")
	}
	log.Debugln(string(s))
	return nil
}

func (w *Worker) deletePod(ctx context.Context, unit *engine.Unit, runner *engine.Runner) error {
	log := logger.FromContext(ctx).
		WithField("pod.name", unit.PodSpec.Name)

	task := &engine.Task{
		TaskId: runner.ID,
		Status: engine.Succeed,
	}

	if err := w.Engine.DeletePod(ctx, unit); err != nil {
		task.Status = engine.Failed
		log.Debugln("删除Pod失败")
	}

	if err := w.Client.Update(ctx, task); err != nil {
		log.WithError(err).Error("cannot update task status")
		return err
	}
	s, e := json.Marshal(runner)
	if e != nil {
		log.WithError(e).Errorln("序列化错误")
	}
	log.Debugln(string(s))
	return nil
}

func (w *Worker) run(ctx context.Context, unit *engine.Unit, runner *engine.Runner) error {
	log := logger.FromContext(ctx).
		WithField("runner.id", runner.ID)

	ctxdone, cancel := context.WithCancel(ctx)
	// defer cancel()

	// TODO 未来需要使用任务配置
	timeout := time.Duration(30) * time.Minute
	ctxtimeout, cancel := context.WithTimeout(ctx, timeout)

	ctxcancel, cancel := context.WithCancel(ctxtimeout)

	// 监控服务器任务是否取消
	go func() {
		done, _ := w.Client.Watch(ctxdone, runner.ID)
		if done {
			task := &engine.Task{
				TaskId:   runner.ID,
				Status:   engine.Failed,
				ExitCode: -1,
			}
			if err := w.Client.Update(ctx, task); err != nil {
				log.WithError(err).Error("cannot update task status")
			}
			if err := w.Engine.DeletePod(ctx, unit); err != nil {
				log.WithError(err).Error("cannot clean task container")
			}
			cancel()
			log.WithField("runner.id", runner.ID).Debugln("收到任务取消通知")
		} else {
			log.Debugln("done listening for cancellations")
		}
	}()

	task := &engine.Task{
		TaskId: runner.ID,
		Status: engine.Running,
	}
	if err := w.Client.Update(ctx, task); err != nil {
		log.WithError(err).Error("cannot update task status")
		return err
	}

	log.Debugln("updated task to running")
	s, e := json.Marshal(runner)
	if e != nil {
		log.WithError(e).Errorln("序列化错误")
	}
	log.Debugln(string(s))

	ctxlogger := logger.WithLogger(ctxcancel, log)
	return w.exec(ctxlogger, unit, runner)
}

func (w *Worker) exec(ctx context.Context, unit *engine.Unit, runner *engine.Runner) error {
	var result error

	log := logger.FromContext(ctx).
		WithField("runner.id", runner.ID)
	var bt bytes.Buffer
	bt.WriteString("当前任务运行于Jianmu-worker ")
	bt.WriteString(w.Name)
	bt.WriteString("\n")
	bt.WriteString("worker类型为: ")
	bt.WriteString(w.Type)
	bt.WriteString("\n")
	bt.WriteString(" worker ID为: ")
	bt.WriteString(w.ID)
	bt.WriteString("\n")
	bt.WriteString("当前任务使用镜像为: ")
	bt.WriteString(runner.Image)
	bt.WriteString("\n")
	bt.WriteString("镜像下载策略为: ")
	bt.WriteString(runner.Pull.String())
	bt.WriteString("\n")

	livelog := livelog.New(w.Client, w.ID, runner.ID)
	livelog.Write(bt.Bytes())
	wc := newReplacer(livelog, secretSlice(runner))
	exited, err := w.Engine.Run(ctx, unit, runner, wc)

	if err != nil {
		livelog.Write([]byte(err.Error()))
	}

	// 关闭日志流
	if err := livelog.Close(); err != nil {
		result = multierror.Append(result, err)
	}

	task := &engine.Task{
		TaskId:   runner.ID,
		ExitCode: 0,
	}

	// 如果context取消则任务取消
	switch ctx.Err() {
	case context.Canceled, context.DeadlineExceeded:
		log.Debugln("任务取消")
		return nil
	}

	if exited != nil {
		if exited.OOMKilled {
			log.Debugln("received oom kill.")
			task.Status = engine.Failed
			task.ExitCode = 137
		} else {
			log.Debugf("received exit code %d", exited.ExitCode)
			task.ExitCode = exited.ExitCode
		}
		if exited.ExitCode == 0 {
			task.Status = engine.Succeed
		} else {
			task.Status = engine.Failed
		}
		task.ResultFile = exited.ResultFile
		if err := w.Client.Update(ctx, task); err != nil {
			log.WithError(err).Error("cannot update task status")
			return err
		}
		return result
	}

	if err != nil {
		task := &engine.Task{
			TaskId:   runner.ID,
			Status:   engine.Failed,
			ExitCode: -1,
			ErrorMsg: err.Error(),
		}
		if err := w.Client.Update(ctx, task); err != nil {
			log.WithError(err).Error("cannot update task status")
			return err
		}
	}

	switch err {
	case context.Canceled, context.DeadlineExceeded:
		return nil
	}

	return result
}

// 从Runner返回密钥数组
func secretSlice(runner *engine.Runner) []types.Secret {
	var secrets []types.Secret
	for i := 0; i < runner.GetSecretLen(); i++ {
		secrets = append(secrets, runner.GetSecretAt(i))
	}
	return secrets
}
